原理剖析: 一文搞懂 Kafka consumer 与 broker 交互机制与原理
01
前言
02
消费者的角色
◾ leader:特殊的一个 member,负责分配所有 member 到 topic partition 的映射;
◾ follower:除了 leader 以外的其他所有 member;
03
消费流程涉及的核心组件
◾ group coordinator:负责同步 consumer member 状态、监听心跳、触发 rebalance、挑选 consumer leader 等行为;
◾ replica manager: 负责 topic partition 副本的管理(读、写等);
◾ metadata:Kafka 集群的元信息;
◾ client:ConsumerNetworkClient 实例,负责网络层读写;
◾ assignors:consumer leader 中负责指定所有 consumer member 到 topic partition 的映射;
◾ coordinator:ConsumerCoordinator 实例,负责与 broker 侧的 group coordinator 交互;
◾ fetcher:负责拉取消息;
04
常用接口
Kafka 的 consumer 的常用接口:
◾ subscribe:指定 consumer 订阅的 topic
◾ poll:拉取消息;
◾ close:优雅退出 consumer;
◾ commit: 手动提交消费位点;
05
consumer 与 broker 交互流程解析
5.1 消费过程
◾ 加入 consumer group,获取负责的 topic partition;
◾ 基于负责的 topic partition,向所在的 broker 拉取消息;
5.1.1 加入 consumer group
5.1.1.1 FindCoordinator 阶段
Consumer 向最近通信过的 broker 发送 FindCoordinator 请求;
该 broker 根据 group.id 进行 hash,再对 __consumer_offsets 的 partition 数目取模,找到负责该 group 的 partition 后,返回 partition leader 所在的 broker 地址;
Consumer 从 FindCoordinator response 中解析出负责本 group 的 broker 的地址,后续 Consumer 侧的 coordinator 组件会与新 broker 通信,同步 consumer group 的状态;
注意:此时 consumer 还没有加入 group,HeartBeatThread 虽然启动了,但没有 enable,还不会向 GroupCoordinator 发送心跳。
5.1.1.2 JoinGroup 阶段
Consumer 发送 JoinGroup 请求;
GroupCoordinator 会检查 JoinGroup 请求的合法性。consumer 在构造的时候是没有 member id 的,因此 JoinGroup 请求中没有附上 member id。此时,GroupCoordinator 会为这个新 consumer 生成一个 member id,随 MEMBER_ID_REQUIRED 异常一并返回;
Consumer 填入 member id,再次发送 JoinGroup 请求;
GroupCoordinator 会在 JoinGroup response 中告知 consumer 当前 group leader 的 member id 以及 consumer 自己的 member id。对于 leader,会额外返回所有 consumer 的 member id,以便 leader 进行后续的 partition 分配工作。
提示1:一般来说,group 的 consumer leader 是第一个向 GroupCoordinator 发起 JoinGroup 请求的 consumer。
提示2:member id 是不可手动设置的。Consumer 侧有个类似的配置是 group.instance.id ,用于声明 consumer 为 静态 consumer。静态 consumer 与普通 consumer 的最大区别在于退出时不会发送 LeaveGroup 请求。在用户业务升级时, 普通 consumer 退出后再拉起会导致较频繁的 rebalance,静态 consumer 就可以规避这种情况(通常会搭配较大的 session timeout 配置)。
5.1.1.3 SyncGroup 阶段
在 consumer member 中分配 partition: 在收到 JoinGroup response 后,consumer group leader 会根据指定的 partition assignment strategy(由 partition.assignment.strategy 参数设置),进行 topic partition 在各个 member 中的分配。
consumer 执行 SyncGroup 请求:leader consumer 会发送 leader SyncGroup 请求,附上 topic partition 与 member 的映射结果;其他 member 会发送 follower SyncGroup 请求,尝试获取自己需要负责的 topic partition。
5.1.2 拉取消息
5.1.2.1 OffsetFetch 阶段
◾ 如果该 partition 查询到了 commited offset 记录,那么 consumer 会从该 offset 开始继续消费;
◾ 否则,根据 consumer 配置的 auto.offset.reset,决定起始消费位点。
5.1.2.2 ListOffset 阶段
5.1.2.3 Fetch 阶段
5.1.2.4 OffsetCommit 阶段
5.2 退出过程
Consumer 同步提交位点信息; 关闭 Heartbeat 线程; Consumer 发送 LeaveGroup 请求到 GroupCoordinator,但不会阻塞式等待 response; GroupCoordinator 收到 LeaveGroup 请求后,将 group 置为 rebalance 状态,触发该 group 中其他 member 的重平衡。
注意: 由于 Consumer 关闭时不会阻塞式等待 LeaveGroup 的 response,在“consuemr 关闭”和“group coordinator 清除该 Consumer 信息” 两个事件之间会存在一小段时间间隙。不等 response 的设计是为了加速 consumer 的关闭,即使 broker 没有收到 Consumer 发送的 LeaveGroup 请求,也会由于心跳超时被踢出 consumer group。
06
broker 侧 consumer group 状态管理
◾ Empty:group 没有 member,等待 offsets 信息失效。常作为初始状态;
◾ PreparingRebalance:rebalance 开始;前文中提到的 broker 会通知所有 member 重平衡,就是在这个状态下通知的;
◾ CompletingRebalance:等待 group leader 提交分配结果;
◾ Stable:group 稳态(所有 consumer 都在正常消费);
◾ Dead:group 没有 member,且 offsets 信息为空;Dead 是最终状态,不可转化为其他状态;
◾ Empty Group(没有 member) 的手动删除;
◾ Group metadata 失效(offsets 信息为空)。原因一般是定时任务清理掉了所有 offsets(已失效);
◾ OffsetsDelete 或 PartitionsDelete 之后,如果 offsets 被清空且 Group 是 Empty;
◾ GroupUnload,即__consumer_offsets 的某个 partition 的 leader 从本机切出去,将内存中 cache 的相关 Group metadata 置为 Dead;
注意:图上所谓“join completed”,指的是 rebalance 结束。rebalance 结束的原因可能是超时或者旧 member 都已经重新加入了。
07
rebalance 实现原理
broker 广播 rebalance 状态的方式:附着在 HeartbeatResponse 或者 OffsetCommitResponse 中,以 error code 形式告知 consumer 需要 rejoin group。
08
重平衡 Q&A
Q: PreparingRebalance 状态下是否会停止消费?A:当且仅当 consumer 感知到自己需要 rejoin group 才会停止消费。PreparingRebalance 状态下可以正常消费和提交位点。不过 CompletingRebalance 状态下不允许提交位点,会抛出 Errors.REBALANCE_IN_PROGRESS,触发 consumer 的 rejoin 动作。
Q: Consumer 手动 assign 和 rebalance 两种模式的区别?A:手动 assgin 模式使用的是 Kafka consumer 的 assign 接口:
consumer.assign(Collections.singleton(new TopicPartition("test-topic", 0)));
rebalance 模式下,Kafka consumer 会订阅指定的 topic,使用的是 Kafka consumer 的 subscribe 接口:
consumer.subscribe(Collections.singleton("test-topic"));
◾ Topic partition 分配者不同:前者是在调用 assign 接口时手动指定的,后者是 consumer group leader 分配的;◾ 重平衡行为不同:手动 assign 时 Kafka consumer 跟 topic partition 是静态绑定的,Kafka consumer 不会参与重平衡;rebalance 模式会根据 consumer 加入、退出等情况触发重平衡,调整各个 Kafka consumer 分配到的 topic partition;◾ Group 元数据包含信息不同:assign 模式下的 group metadata 是没有 member 信息的,仅用于存储位点信息;
需要注意的是,两种模式互斥。assign 模式下,Kafka consumer 不支持动态扩容,当生产速率突增时,无法及时加入新的消费者来提升消费的速率。如果业务希望完全避免消费过程中出现 topic partition 漂移(一种可能的场景是,生产者将 user_id 作为 record key,且消费时要求只能有一个 consumer 处理同一个 user 的数据),那么才有必要考虑使用 assign 模式。此外,assign 模式还需要注意避免 group id 与其他 group id 碰撞,否则有可能导致 commited offset 的污染。
09
总结
往期推荐
AutoMQ 携手阿里云共同发布新一代云原生 Kafka,帮助得物有效压缩 85% Kafka 云支出!
2024-03-13
从 Redis 开源协议变更看开源软件与云计算巨头之间的竞争博弈
2024-03-27
原理剖析:AutoMQ 如何基于裸设备实现高性能的 WAL
2024-03-22
Kafka 痛点专题|AutoMQ 如何解决 Kafka 冷读副作用
2024-03-15
END
关于我们
我们是来自 Apache RocketMQ 和 Linux LVS 项目的核心团队,曾经见证并应对过消息队列基础设施在大型互联网公司和云计算公司的挑战。现在我们基于对象存储优先、存算分离、多云原生等技术理念,重新设计并实现了 Apache Kafka 和 Apache RocketMQ,带来高达 10 倍的成本优势和百倍的弹性效率提升。
🌟 GitHub 地址:https://github.com/AutoMQ/automq
💻 官网:https://www.automq.com
👀 B站:AutoMQ官方账号
🔍 视频号:AutoMQ
👉🏻 扫二维码
加入我们的社区群
关注我们,一起学习更多云原生技术干货!
👇🏻点击下方阅读原文,前往 GitHub 了解体验!